package com.xiaofan.java;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import sun.java2d.pipe.SpanShapeRenderer;

import java.util.Properties;

/**
 * 查询topic: bin/kafka-topics.sh --list --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181
 * <p>
 * 创建topic： bin/kafka-topics.sh --create --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --replication-factor 2 --partitions 3 --topic flink_kafka_source_A0004
 * <p>
 * 删除topic: bin/kafka-topics.sh --delete --zookeeper 192.168.1.23:2181,192.168.1.24:2181,192.168.1.25:2181 --topic flink_kafka_source_A0004
 * <p>
 * 生产数据：bin/kafka-console-producer.sh --broker-list 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --topic flink_kafka_source_A0004
 * <p>
 * 消费数据： bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091 --from-beginning --topic flink_kafka_source_A0004
 * <p>
 * flink on yarn 运行命令： bin/flink run -m yarn-cluster /home/hadoop/fanjh/project/FlinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar
 */

public class StreamingKafkaSink_A0004 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 设置statebackend
        // env.setStateBackend(new RocksDBStateBackend("hdfs://mini1:9000/com.xiaofan.flink/checkpoints", true));

        DataStreamSource<String> text = env.socketTextStream("192.168.1.27", 9999);

        String topic = "flink_kafka_source_A0004";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091");
        //第一种解决方案，设置FlinkKafkaProducer011里面的事务超时时间
        //设置事务超时时间
        properties.setProperty("transaction.timeout.ms",String.valueOf(60000*15));

        //第二种解决方案，设置kafka的最大事务超时时间
        // FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());

        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), properties, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
        text.addSink(myProducer).name("kafkaSink");

        env.execute("StreamingKafkaSink_A0004");



    }
}
